api-reference-examples/python-notebook/ThreatExchange Data Dashboard.ipynb (479 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# ThreatExchange Data Dashboard\n",
"\n",
"**Purpose**\n",
" \n",
"The ThreatExchange APIs are designed to make consuming threat intelligence from multiple sources easy. This notebook will walk you through:\n",
"\n",
" - building an initial dashboard for assessing the data visible to your appID;\n",
" - filtering down to a subset you consider *high value*; and\n",
" - exporting the high value data to a file.\n",
"\n",
"**What you need**\n",
"\n",
"Before getting started, you'll need a few Python packages installed:\n",
"\n",
" - [Pandas](http://pandas.pydata.org/) for data manipulation and analysis\n",
" - [Pytx](https://pytx.readthedocs.org/en/latest/installation.html) for ThreatExchange access\n",
" - [Seaborn](https://stanford.edu/~mwaskom/software/seaborn/) for making charts pretty\n",
"\n",
"All of the python packages mentioned can be installed via \n",
"\n",
"```\n",
"pip install <package_name>\n",
"```\n",
"\n",
"### Setup a ThreatExchange `access_token`\n",
"\n",
"If you don't already have an `access_token` for your app, use the [Facebook Access Token Tool]( https://developers.facebook.com/tools/accesstoken/) to get one."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from pytx.access_token import access_token\n",
"from pytx.logger import setup_logger\n",
"from pytx.vocabulary import PrivacyType as pt\n",
"\n",
"# Specify the location of your token via one of several ways:\n",
"# https://pytx.readthedocs.org/en/latest/pytx.access_token.html\n",
"access_token()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Optionally, enable debug level logging"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Uncomment this if you want debug logging enabled\n",
"#setup_logger(log_file=\"pytx.log\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Search for data in ThreatExchange\n",
"\n",
"Start by running a query against the ThreatExchange APIs to pull down any/all data relevant to you over a specified period of days."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Our basic search parameters, we default to querying over the past 14 days\n",
"days_back = 14\n",
"search_terms = ['abuse', 'phishing', 'malware', 'exploit', 'apt', 'ddos', 'brute', 'scan', 'cve']"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we execute the query using our search parameters and put the results in a Pandas `DataFrame`"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from datetime import datetime, timedelta\n",
"from time import strftime\n",
"import pandas as pd\n",
"import re\n",
"\n",
"from pytx import ThreatDescriptor\n",
"from pytx.vocabulary import ThreatExchange as te\n",
"\n",
"# Define your search string and other params, see \n",
"# https://pytx.readthedocs.org/en/latest/pytx.common.html#pytx.common.Common.objects\n",
"# for the full list of options\n",
"search_params = {\n",
" te.FIELDS: ThreatDescriptor._default_fields,\n",
" te.LIMIT: 1000,\n",
" te.SINCE: strftime('%Y-%m-%d %H:%m:%S +0000', (datetime.utcnow() + timedelta(days=(-1*days_back))).timetuple()),\n",
" te.TEXT: search_terms,\n",
" te.UNTIL: strftime('%Y-%m-%d %H:%m:%S +0000', datetime.utcnow().timetuple()),\n",
" te.STRICT_TEXT: False\n",
"}\n",
"\n",
"data_frame = None\n",
"for search_term in search_terms:\n",
" print \"Searching for '%s' over -%d days\" % (search_term, days_back)\n",
" results = ThreatDescriptor.objects(\n",
" fields=search_params[te.FIELDS],\n",
" limit=search_params[te.LIMIT],\n",
" text=search_term, \n",
" since=search_params[te.SINCE], \n",
" until=search_params[te.UNTIL],\n",
" strict_text=search_params[te.STRICT_TEXT]\n",
" )\n",
" tmp = pd.DataFrame([result.to_dict() for result in results])\n",
" tmp['search_term'] = search_term\n",
" print \"\\t... found %d descriptors\" % tmp.size\n",
" if data_frame is None:\n",
" data_frame = tmp\n",
" else:\n",
" data_frame = data_frame.append(tmp)\n",
" \n",
"print \"\\nFound %d descriptors in total.\" % data_frame.size"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Do some data munging for easier analysis and then preview as a sanity check"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from time import mktime\n",
"\n",
"# Extract a datetime and timestamp, for easier analysis\n",
"data_frame['ds'] = pd.to_datetime(data_frame.added_on.str[0:10], format='%Y-%m-%d')\n",
"data_frame['ts'] = pd.to_datetime(data_frame.added_on)\n",
"\n",
"# Extract the owner data\n",
"owner = data_frame.pop('owner')\n",
"owner = owner.apply(pd.Series)\n",
"data_frame = pd.concat([data_frame, owner.email, owner.name], axis=1)\n",
"\n",
"# Extract freeform 'tags' in the description\n",
"def extract_tags(text):\n",
" return re.findall(r'\\[([a-zA-Z0-9\\:\\-\\_]+)\\]', text)\n",
"data_frame['tags'] = data_frame.description.map(lambda x: [] if x is None else extract_tags(x))\n",
"\n",
"data_frame.head(n=5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create a Dashboard to Get a High-level View\n",
"\n",
"The raw data is great, but it would be much better if we could take a higher level view of the data. This dashboard will provide more insight into:\n",
"\n",
" - what data is available\n",
" - who's sharing it\n",
" - how is labeled\n",
" - how much of it is likely to be directly applicable for alerting"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import math\n",
"import matplotlib.pyplot as plt\n",
"import seaborn as sns\n",
"\n",
"from pytx.vocabulary import ThreatDescriptor as td\n",
"\n",
"%matplotlib inline\n",
"\n",
"# Setup subplots for our dashboard\n",
"fig, axes = plt.subplots(nrows=4, ncols=2, figsize=(16,32))\n",
"axes[0,0].set_color_cycle(sns.color_palette(\"coolwarm_r\", 15))\n",
"\n",
"# Plot by Type over time\n",
"type_over_time = data_frame.groupby(\n",
" [pd.Grouper(freq='d', key='ds'), te.TYPE]\n",
" ).count().unstack(te.TYPE)\n",
"type_over_time.added_on.plot(\n",
" kind='line', \n",
" stacked=True, \n",
" title=\"Indicator Types Per Day (-\" + str(days_back) + \"d)\",\n",
" ax=axes[0,0]\n",
")\n",
"\n",
"# Plot by threat_type over time\n",
"tt_over_time = data_frame.groupby(\n",
" [pd.Grouper(freq='w', key='ds'), 'threat_type']\n",
" ).count().unstack('threat_type')\n",
"tt_over_time.added_on.plot(\n",
" kind='bar', \n",
" stacked=True, \n",
" title=\"Threat Types Per Week (-\" + str(days_back) + \"d)\",\n",
" ax=axes[0,1]\n",
")\n",
"\n",
"# Plot the top 10 tags\n",
"tags = pd.DataFrame([item for sublist in data_frame.tags for item in sublist])\n",
"tags[0].value_counts().head(10).plot(\n",
" kind='bar', \n",
" stacked=True,\n",
" title=\"Top 10 Tags (-\" + str(days_back) + \"d)\",\n",
" ax=axes[1,0]\n",
")\n",
"\n",
"# Plot by who is sharing\n",
"owner_over_time = data_frame.groupby(\n",
" [pd.Grouper(freq='w', key='ds'), 'name']\n",
" ).count().unstack('name')\n",
"owner_over_time.added_on.plot(\n",
" kind='bar', \n",
" stacked=True, \n",
" title=\"Who's Sharing Each Week? (-\" + str(days_back) + \"d)\",\n",
" ax=axes[1,1]\n",
")\n",
"\n",
"# Plot the data as a timeseries of when it was published\n",
"data_over_time = data_frame.groupby(pd.Grouper(freq='6H', key='ts')).count()\n",
"data_over_time.added_on.plot(\n",
" kind='line',\n",
" title=\"Data shared over time (-\" + str(days_back) + \"d)\",\n",
" ax=axes[2,0]\n",
")\n",
"\n",
"# Plot by status label\n",
"data_frame.status.value_counts().plot(\n",
" kind='pie', \n",
" title=\"Threat Statuses (-\" + str(days_back) + \"d)\",\n",
" ax=axes[2,1]\n",
")\n",
"\n",
"# Heatmap by type / source\n",
"owner_and_type = pd.DataFrame(data_frame[['name', 'type']])\n",
"owner_and_type['n'] = 1\n",
"grouped = owner_and_type.groupby(['name', 'type']).count().unstack('type').fillna(0)\n",
"ax = sns.heatmap(\n",
" data=grouped['n'], \n",
" robust=True,\n",
" cmap=\"YlGnBu\",\n",
" ax=axes[3,0]\n",
")\n",
"\n",
"# These require a little data munging\n",
"# translate a severity enum to a value\n",
"# TODO Add this translation to Pytx\n",
"def severity_value(severity):\n",
" if severity == 'UNKNOWN': return 0\n",
" elif severity == 'INFO': return 1\n",
" elif severity == 'WARNING': return 3\n",
" elif severity == 'SUSPICIOUS': return 5\n",
" elif severity == 'SEVERE': return 7\n",
" elif severity == 'APOCALYPSE': return 10\n",
" return 0\n",
"# translate a severity \n",
"def value_severity(severity):\n",
" if severity >= 9: return 'APOCALYPSE'\n",
" elif severity >= 6: return 'SEVERE'\n",
" elif severity >= 4: return 'SUSPICIOUS'\n",
" elif severity >= 2: return 'WARNING'\n",
" elif severity >= 1: return 'INFO'\n",
" elif severity >= 0: return 'UNKNOWN'\n",
"\n",
"# Plot by how actionable the data is \n",
"# Build a special dataframe and chart it\n",
"data_frame['severity_value'] = data_frame.severity.apply(severity_value)\n",
"df2 = pd.DataFrame({'count' : data_frame.groupby(['name', 'confidence', 'severity_value']).size()}).reset_index()\n",
"ax = df2.plot(\n",
" kind='scatter', \n",
" x='severity_value', y='confidence', \n",
" xlim=(-1,11), ylim=(-10,110), \n",
" title='Data by Conf / Sev With Threshold Line',\n",
" ax=axes[3,1],\n",
" s=df2['count'].apply(lambda x: 1000 * math.log10(x)),\n",
" use_index=td.SEVERITY\n",
")\n",
"# Draw a threshhold for data we consider likely using for alerts (aka 'high value')\n",
"ax.plot([2,10], [100,0], c='red')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Dive A Little Deeper\n",
"\n",
"Take a subset of the data and understand it a little more. \n",
"\n",
"In this example, we presume that we'd like to take phishing related data and study it, to see if we can use it to better defend a corporate network or abuse in a product. \n",
"\n",
"As a simple example, we'll filter down to data labeled **`MALICIOUS`** and the word **`phish`** in the description, to see if we can make a more detailed conclusion on how to apply the data to our existing internal workflows."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from pytx.vocabulary import Status as s\n",
"\n",
"\n",
"phish_data = data_frame[(data_frame.status == s.MALICIOUS) \n",
" & data_frame.description.apply(lambda x: x.find('phish') if x != None else False)]\n",
"# TODO: also filter for attack_type == PHISHING, when Pytx supports it\n",
"\n",
"%matplotlib inline\n",
"\n",
"# Setup subplots for our deeper dive plots\n",
"fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(16,8))\n",
"\n",
"# Heatmap of type / source\n",
"owner_and_type = pd.DataFrame(phish_data[['name', 'type']])\n",
"owner_and_type['n'] = 1\n",
"grouped = owner_and_type.groupby(['name', 'type']).count().unstack('type').fillna(0)\n",
"ax = sns.heatmap(\n",
" data=grouped['n'], \n",
" robust=True,\n",
" cmap=\"YlGnBu\",\n",
" ax=axes[0]\n",
")\n",
"\n",
"# Tag breakdown of the top 10 tags\n",
"tags = pd.DataFrame([item for sublist in phish_data.tags for item in sublist])\n",
"tags[0].value_counts().head(10).plot(\n",
" kind='pie',\n",
" title=\"Top 10 Tags (-\" + str(days_back) + \"d)\",\n",
" ax=axes[1]\n",
")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Extract The High Confidence / Severity Data For Use\n",
"\n",
"With a better understanding of the data, let's filter the **`MALICIOUS`**, **`REVIEWED_MANUALLY`** labeled data down to a pre-determined threshold for confidence + severity. \n",
"\n",
"You can add more filters, or change the threshold, as you see fit."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from pytx.vocabulary import ReviewStatus as rs\n",
"\n",
"# define our threshold line, which is the same as the red, threshold line in the chart above\n",
"sev_min = 2\n",
"sev_max = 10\n",
"conf_min= 0\n",
"conf_max = 100\n",
"\n",
"# build a new series, to indicate if a row passes our confidence + severity threshold\n",
"def is_high_value(conf, sev):\n",
" return (((sev_max - sev_min) * (conf - conf_max)) - ((conf_min - conf_max) * (sev - sev_min))) > 0\n",
"data_frame['is_high_value']= data_frame.apply(lambda x: is_high_value(x.confidence, x.severity_value), axis=1)\n",
"\n",
"# filter down to just the data passing our criteria, you can add more here to filter by type, source, etc.\n",
"high_value_data = data_frame[data_frame.is_high_value \n",
" & (data_frame.status == s.MALICIOUS)\n",
" & (data_frame.review_status == rs.REVIEWED_MANUALLY)].reset_index(drop=True)\n",
"\n",
"# get a count of how much we kept\n",
"print \"Kept %d of %d data as high value\" % (high_value_data.size, data_frame.size)\n",
"\n",
"# ... and preview it\n",
"high_value_data.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, output all of the high value data to a file as CSV or JSON, for consumption in our other systems and workflows."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"use_csv = False\n",
"\n",
"if use_csv:\n",
" file_name = 'threat_exchange_high_value.csv'\n",
" high_value_data.to_csv(path_or_buf=file_name)\n",
" print \"CSV data written to %s\" % file_name\n",
"else:\n",
" file_name = 'threat_exchange_high_value.json'\n",
" high_value_data.to_json(path_or_buf=file_name, orient='index')\n",
" print \"JSON data written to %s\" % file_name"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 0
}